/*
 * Copyright Debezium Authors.
 *
 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
 */
package io.debezium.connector.oracle.logminer.parser;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.data.Envelope;
import io.debezium.relational.Table;

/**
 * A simple DML parser implementation specifically for Oracle LogMiner.
 *
 * The syntax of each DML operation is restricted to the format generated by Oracle LogMiner.  The
 * following are examples of each expected syntax:
 *
 * <pre>
 *     insert into "schema"."table"("C1","C2") values ('v1','v2');
 *     update "schema"."table" set "C1" = 'v1a', "C2" = 'v2a' where "C1" = 'v1' and "C2" = 'v2';
 *     delete from "schema"."table" where "C1" = 'v1' AND "C2" = 'v2';
 * </pre>
 *
 * Certain data types are not emitted as string literals, such as {@code DATE} and {@code TIMESTAMP}.
 * For these data types, they're emitted as function calls.  The parser can detect this use case and
 * will emit the values for such columns as the explicit function call.
 *
 * Lets take the following {@code UPDATE} statement:
 *
 * <pre>
 *     update "schema"."table"
 *        set "C1" = TO_TIMESTAMP('2020-02-02 00:00:00', 'YYYY-MM-DD HH24:MI:SS')
 *      where "C1" = TO_TIMESTAMP('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS');
 * </pre>
 *
 * The new value for {@code C1} would be {@code TO_TIMESTAMP('2020-02-02 00:00:00', 'YYYY-MM-DD HH24:MI:SS')}.
 * The old value for {@code C1} would be {@code TO_TIMESTAMP('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS')}.
 *
 * @author Chris Cranford
 */
public class LogMinerDmlParser implements DmlParser {

    private static final String SINGLE_QUOTE = "'";
    private static final String NULL = "NULL";
    private static final String INSERT_INTO = "insert into ";
    private static final String UPDATE = "update ";
    private static final String DELETE_FROM = "delete from ";
    private static final String AND = "and ";
    private static final String OR = "or ";
    private static final String SET = " set ";
    private static final String WHERE = " where ";
    private static final String VALUES = " values ";
    private static final String IS_NULL = "IS NULL";
    // Use by Oracle for specific data types that cannot be represented in SQL
    private static final String UNSUPPORTED = "Unsupported";
    private static final String UNSUPPORTED_TYPE = "Unsupported Type";

    private static final int INSERT_INTO_LENGTH = INSERT_INTO.length();
    private static final int UPDATE_LENGTH = UPDATE.length();
    private static final int DELETE_FROM_LENGTH = DELETE_FROM.length();
    private static final int VALUES_LENGTH = VALUES.length();
    private static final int SET_LENGTH = SET.length();
    private static final int WHERE_LENGTH = WHERE.length();

    @Override
    public LogMinerDmlEntry parse(String sql, Table table, String txId) {
        if (table == null) {
            throw new DmlParserException("DML parser requires a non-null table");
        }
        if (sql != null && sql.length() > 0) {
            switch (sql.charAt(0)) {
                case 'i':
                    return parseInsert(sql, table);
                case 'u':
                    return parseUpdate(sql, table);
                case 'd':
                    return parseDelete(sql, table);
            }
        }
        throw new DmlParserException("Unknown supported SQL '" + sql + "'");
    }

    /**
     * Parse an {@code INSERT} SQL statement.
     *
     * @param sql the sql statement
     * @param table the table
     * @return the parsed DML entry record or {@code null} if the SQL was not parsed
     */
    private LogMinerDmlEntry parseInsert(String sql, Table table) {
        try {
            // advance beyond "insert into "
            int index = INSERT_INTO_LENGTH;

            // parse table
            index = parseTableName(sql, index);

            // capture column names
            List<String> columnNames = new ArrayList<>();
            index = parseColumnListClause(sql, index, columnNames);

            // capture values
            List<String> columnValues = new ArrayList<>(columnNames.size());
            index = parseColumnValuesClause(sql, index, columnValues);

            if (columnNames.size() != columnValues.size()) {
                throw new DmlParserException("Columns: " + columnNames + ", Values: " + columnValues);
            }

            final Map<String, String> columnMap = createMap(columnNames, columnValues);

            final List<LogMinerColumnValue> newValues = new ArrayList<>(columnNames.size());
            for (int i = 0; i < table.columns().size(); ++i) {
                String columnName = table.columns().get(i).name();
                String columnValue = columnMap.get(columnName);
                newValues.add(createColumnValue(columnName, columnValue));
            }

            return new LogMinerDmlEntryImpl(Envelope.Operation.CREATE, newValues, Collections.emptyList());
        }
        catch (Exception e) {
            throw new DmlParserException("Failed to parse insert DML: '" + sql + "'", e);
        }
    }

    private static <K, V> Map<K, V> createMap(List<K> keys, List<V> values) {
        Map<K, V> result = new LinkedHashMap<>(keys.size());
        for (int i = 0; i < keys.size(); ++i) {
            result.put(keys.get(i), values.get(i));
        }
        return result;
    }

    /**
     * Parse an {@code UPDATE} SQL statement.
     *
     * @param sql the sql statement
     * @param table the table
     * @return the parsed DML entry record or {@code null} if the SQL was not parsed
     */
    private LogMinerDmlEntry parseUpdate(String sql, Table table) {
        try {
            // advance beyond "update "
            int index = UPDATE_LENGTH;

            // parse table
            index = parseTableName(sql, index);

            // parse set
            List<String> newColumnNames = new ArrayList<>();
            List<String> newColumnValues = new ArrayList<>();
            index = parseSetClause(sql, index, newColumnNames, newColumnValues);

            // parse where
            List<String> oldColumnNames = new ArrayList<>();
            List<String> oldColumnValues = new ArrayList<>();
            parseWhereClause(sql, index, oldColumnNames, oldColumnValues);

            final Map<String, String> beforeColumnMap = createMap(oldColumnNames, oldColumnValues);
            final Map<String, String> afterColumnMap = createMap(newColumnNames, newColumnValues);

            // set before
            final List<LogMinerColumnValue> oldValues;
            if (!beforeColumnMap.isEmpty()) {
                oldValues = new ArrayList<>(table.columns().size());
                for (int i = 0; i < table.columns().size(); ++i) {
                    String columnName = table.columns().get(i).name();
                    String columnValue = beforeColumnMap.get(columnName);
                    oldValues.add(createColumnValue(columnName, columnValue));
                }
            }
            else {
                oldValues = Collections.emptyList();
            }

            // set after
            List<LogMinerColumnValue> newValues;
            if (!afterColumnMap.isEmpty()) {
                newValues = new ArrayList<>(table.columns().size());
                for (int i = 0; i < table.columns().size(); ++i) {
                    String columnName = table.columns().get(i).name();
                    if (afterColumnMap.containsKey(columnName)) {
                        LogMinerColumnValue value = new LogMinerColumnValueImpl(columnName, 0);
                        value.setColumnData(afterColumnMap.get(columnName));
                        newValues.add(value);
                    }
                    else {
                        LogMinerColumnValue value = new LogMinerColumnValueImpl(columnName, 0);
                        value.setColumnData(beforeColumnMap.get(columnName));
                        newValues.add(value);
                    }
                }
            }
            else {
                newValues = Collections.emptyList();
            }

            return new LogMinerDmlEntryImpl(Envelope.Operation.UPDATE, newValues, oldValues);
        }
        catch (Exception e) {
            throw new DmlParserException("Failed to parse update DML: '" + sql + "'", e);
        }
    }

    /**
     * Parses a SQL {@code DELETE} statement.
     *
     * @param sql the sql statement
     * @param table the table
     * @return the parsed DML entry record or {@code null} if the SQL was not parsed
     */
    private LogMinerDmlEntry parseDelete(String sql, Table table) {
        try {
            // advance beyond "delete from "
            int index = DELETE_FROM_LENGTH;

            // parse table
            index = parseTableName(sql, index);

            // parse where
            List<String> columnNames = new ArrayList<>();
            List<String> columnValues = new ArrayList<>();
            parseWhereClause(sql, index, columnNames, columnValues);

            final Map<String, String> beforeColumnMap = createMap(columnNames, columnValues);

            List<LogMinerColumnValue> oldValues;
            if (!beforeColumnMap.isEmpty()) {
                oldValues = new ArrayList<>(columnNames.size());
                for (int i = 0; i < table.columns().size(); ++i) {
                    String columnName = table.columns().get(i).name();
                    String columnValue = beforeColumnMap.get(columnName);
                    oldValues.add(createColumnValue(columnName, columnValue));
                }
            }
            else {
                oldValues = Collections.emptyList();
            }

            return new LogMinerDmlEntryImpl(Envelope.Operation.DELETE, Collections.emptyList(), oldValues);
        }
        catch (Exception e) {
            throw new DmlParserException("Failed to parse delete DML: '" + sql + "'", e);
        }
    }

    /**
     * Parses a table-name in the SQL clause
     *
     * @param sql the sql statement
     * @param index the index into the sql statement to begin parsing
     * @return the index into the sql string where the table name ended
     */
    private int parseTableName(String sql, int index) {
        boolean inQuote = false;

        for (; index < sql.length(); ++index) {
            char c = sql.charAt(index);
            if (c == '"') {
                if (inQuote) {
                    inQuote = false;
                    continue;
                }
                inQuote = true;
            }
            else if ((c == ' ' || c == '(') && !inQuote) {
                break;
            }
        }

        return index;
    }

    /**
     * Parse an {@code INSERT} statement's column-list clause.
     *
     * @param sql the sql statement
     * @param start the index into the sql statement to begin parsing
     * @param columnNames the list that will be populated with the column names
     * @return the index into the sql string where the column-list clause ended
     */
    private int parseColumnListClause(String sql, int start, List<String> columnNames) {
        int index = start;
        boolean inQuote = false;
        for (; index < sql.length(); ++index) {
            char c = sql.charAt(index);
            if (c == '(' && !inQuote) {
                start = index + 1;
            }
            else if (c == ')' && !inQuote) {
                index++;
                break;
            }
            else if (c == '"') {
                if (inQuote) {
                    inQuote = false;
                    columnNames.add(sql.substring(start + 1, index));
                    start = index + 2;
                    continue;
                }
                inQuote = true;
            }
        }
        return index;
    }

    /**
     * Parse an {@code INSERT} statement's column-values clause.
     *
     * @param sql the sql statement
     * @param start the index into the sql statement to begin parsing
     * @param columnValues the list of that will populated with the column values
     * @return the index into the sql string where the column-values clause ended
     */
    private int parseColumnValuesClause(String sql, int start, List<String> columnValues) {
        int index = start;
        int nested = 0;
        boolean inQuote = false;
        boolean inValues = false;

        // verify entering values-clause
        if (!sql.substring(index, index + 8).equals(VALUES)) {
            throw new DebeziumException("Failed to parse DML: " + sql);
        }
        index += VALUES_LENGTH;

        for (; index < sql.length(); ++index) {
            char c = sql.charAt(index);
            if (c == '(' && !inQuote && !inValues) {
                inValues = true;
                start = index + 1;
            }
            else if (c == '(' && !inQuote) {
                nested++;
            }
            else if (c == '\'') {
                if (inQuote) {
                    inQuote = false;
                    continue;
                }
                inQuote = true;
            }
            else if (!inQuote && (c == ',' || c == ')')) {
                if (c == ')' && nested != 0) {
                    nested--;
                    continue;
                }
                if (c == ',' && nested != 0) {
                    continue;
                }
                String s = sql.substring(start, index);
                if (s.startsWith("'") && s.endsWith("'")) {
                    // if the value is single-quoted at the start/end, clear the quotes.
                    s = s.substring(1, s.length() - 1);
                }
                columnValues.add(s.equals(UNSUPPORTED_TYPE) ? null : s);
                start = index + 1;
            }
        }

        return index;
    }

    /**
     * Parse an {@code UPDATE} statement's {@code SET} clause.
     *
     * @param sql the sql statement
     * @param start the index into the sql statement to begin parsing
     * @param columnNames the list of the changed column names that will be populated
     * @param columnValues the list of the changed column values that will be populated
     * @return the index into the sql string where the set-clause ended
     */
    private int parseSetClause(String sql, int start, List<String> columnNames, List<String> columnValues) {
        boolean inDoubleQuote = false;
        boolean inSingleQuote = false;
        boolean inColumnName = true;
        boolean inColumnValue = false;
        boolean inSpecial = false;
        int nested = 0;

        // verify entering set-clause
        if (!sql.substring(start, start + SET_LENGTH).equals(SET)) {
            throw new DebeziumException("Failed to parse DML: " + sql);
        }
        start += SET_LENGTH;

        int index = start;
        for (; index < sql.length(); ++index) {
            char c = sql.charAt(index);
            char lookAhead = (index + 1 < sql.length()) ? sql.charAt(index + 1) : 0;
            if (c == '"' && inColumnName) {
                // Set clause column names are double-quoted
                if (inDoubleQuote) {
                    inDoubleQuote = false;
                    columnNames.add(sql.substring(start + 1, index));
                    start = index + 1;
                    inColumnName = false;
                    continue;
                }
                inDoubleQuote = true;
                start = index;
            }
            else if (c == '=' && !inColumnName && !inColumnValue) {
                inColumnValue = true;
                // Oracle SQL generated is always ' = ', skipping following space
                index += 1;
                start = index + 1;
            }
            else if (c == '\'' && inColumnValue) {
                // Set clause single-quoted column value
                if (inSingleQuote) {
                    inSingleQuote = false;
                    if (nested == 0) {
                        columnValues.add(sql.substring(start + 1, index));
                        start = index + 1;
                        inColumnValue = false;
                        inColumnName = false;
                    }
                    continue;
                }
                if (!inSpecial) {
                    start = index;
                }
                inSingleQuote = true;
            }
            else if (c == ',' && !inColumnValue && !inColumnName) {
                // Set clause uses ', ' skip following space
                inColumnName = true;
                index += 1;
                start = index;
            }
            else if (inColumnValue && !inSingleQuote) {
                if (!inSpecial) {
                    start = index;
                    inSpecial = true;
                }
                // characters as a part of the value
                if (c == '(') {
                    nested++;
                }
                else if (c == ')' && nested > 0) {
                    nested--;
                }
                else if ((c == ',' || c == ' ' || c == ';') && nested == 0) {
                    String value = sql.substring(start, index);
                    if (value.equals(NULL) || value.equals(UNSUPPORTED_TYPE)) {
                        columnValues.add(null);
                        start = index + 1;
                        inColumnValue = false;
                        inSpecial = false;
                        inColumnName = true;
                        continue;
                    }
                    else if (value.equals(UNSUPPORTED)) {
                        continue;
                    }
                    columnValues.add(sql.substring(start, index));
                    start = index + 1;
                    inColumnValue = false;
                    inSpecial = false;
                    inColumnName = true;
                }
            }
            else if (!inDoubleQuote && !inSingleQuote) {
                // else if (!inDoubleQuote && !inSingleQuote && sql.substring(index - 1, index + WHERE_LENGTH - 1).equals(WHERE)) {
                if (c == 'w' && lookAhead == 'h' && sql.substring(index - 1, index + WHERE_LENGTH - 1).equals(WHERE)) {
                    index -= 1;
                    break;
                }
            }
        }

        return index;
    }

    /**
     * Parses a {@code WHERE} clause populates the provided column names and values arrays.
     *
     * @param sql the sql statement
     * @param start the index into the sql statement to begin parsing
     * @param columnNames the column names parsed from the clause
     * @param columnValues the column values parsed from the clause
     * @return the index into the sql string to continue parsing
     */
    private int parseWhereClause(String sql, int start, List<String> columnNames, List<String> columnValues) {
        int nested = 0;
        boolean inColumnName = true;
        boolean inColumnValue = false;
        boolean inDoubleQuote = false;
        boolean inSingleQuote = false;
        boolean inSpecial = false;

        // DBZ-3235
        // LogMiner can generate SQL without a WHERE condition under some circumstances and if it does
        // we shouldn't immediately fail DML parsing.
        if (start >= sql.length()) {
            return start;
        }

        // verify entering where-clause
        if (!sql.substring(start, start + WHERE_LENGTH).equals(WHERE)) {
            throw new DebeziumException("Failed to parse DML: " + sql);
        }
        start += WHERE_LENGTH;

        int index = start;
        for (; index < sql.length(); ++index) {
            char c = sql.charAt(index);
            char lookAhead = (index + 1 < sql.length()) ? sql.charAt(index + 1) : 0;
            if (c == '"' && inColumnName) {
                // Where clause column names are double-quoted
                if (inDoubleQuote) {
                    inDoubleQuote = false;
                    columnNames.add(sql.substring(start + 1, index));
                    start = index + 1;
                    inColumnName = false;
                    continue;
                }
                inDoubleQuote = true;
                start = index;
            }
            else if (c == '=' && !inColumnName && !inColumnValue) {
                inColumnValue = true;
                // Oracle SQL generated is always ' = ', skipping following space
                index += 1;
                start = index + 1;
            }
            else if (c == 'I' && !inColumnName && !inColumnValue) {
                if (sql.substring(index).startsWith(IS_NULL)) {
                    columnValues.add(null);
                    index += 6;
                    start = index;
                    continue;
                }
            }
            else if (c == '\'' && inColumnValue) {
                // Where clause single-quoted column value
                if (inSingleQuote) {
                    inSingleQuote = false;
                    if (nested == 0) {
                        columnValues.add(sql.substring(start + 1, index));
                        start = index + 1;
                        inColumnValue = false;
                        inColumnName = false;
                    }
                    continue;
                }
                if (!inSpecial) {
                    start = index;
                }
                inSingleQuote = true;
            }
            else if (inColumnValue && !inSingleQuote) {
                if (!inSpecial) {
                    start = index;
                    inSpecial = true;
                }
                if (c == '(') {
                    nested++;
                }
                else if (c == ')' && nested > 0) {
                    nested--;
                }
                else if ((c == ';' || c == ' ') && nested == 0) {
                    String value = sql.substring(start, index);
                    if (value.equals(NULL) || value.equals(UNSUPPORTED_TYPE)) {
                        columnValues.add(null);
                        start = index + 1;
                        inColumnValue = false;
                        inSpecial = false;
                        inColumnName = true;
                        continue;
                    }
                    else if (value.equals(UNSUPPORTED)) {
                        continue;
                    }
                    columnValues.add(sql.substring(start, index));
                    start = index + 1;
                    inColumnValue = false;
                    inSpecial = false;
                    inColumnName = true;
                }
            }
            else if (!inColumnValue && !inColumnName) {
                if (c == 'a' && lookAhead == 'n' && sql.substring(index).startsWith(AND)) {
                    index += 3;
                    start = index;
                    inColumnName = true;
                }
                else if (c == 'o' && lookAhead == 'r' && sql.substring(index).startsWith(OR)) {
                    index += 2;
                    start = index;
                    inColumnName = true;
                }
            }
        }

        return index;
    }

    /**
     * Remove {@code '} quotes from around the provided text if they exist; otherwise the value is returned as-is.
     *
     * @param text the text to remove single quotes
     * @return the text with single quotes removed
     */
    private static String removeSingleQuotes(String text) {
        if (text.startsWith(SINGLE_QUOTE) && text.endsWith(SINGLE_QUOTE)) {
            return text.substring(1, text.length() - 1);
        }
        return text;
    }

    /**
     * Helper method to create a {@link LogMinerColumnValue} from a column name/value pair.
     *
     * @param columnName the column name
     * @param columnValue the column value
     * @return the LogMiner column value object
     */
    private static LogMinerColumnValue createColumnValue(String columnName, String columnValue) {
        LogMinerColumnValue value = new LogMinerColumnValueImpl(columnName, 0);
        if (columnValue != null && !columnValue.equals(NULL)) {
            value.setColumnData(columnValue);
        }
        return value;
    }
}
